package backtype.storm.topology;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.Config;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.ComponentObject;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.Grouping;
import backtype.storm.generated.NullStruct;
import backtype.storm.generated.SpoutSpec;
import backtype.storm.generated.StateSpoutSpec;
import backtype.storm.generated.StormTopology;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

/**
 * TopologyBuilder exposes the Java API for specifying a topology for Storm to
 * execute. Topologies are Thrift structures in the end, but since the Thrift
 * API is so verbose, TopologyBuilder greatly eases the process of creating
 * topologies. The template for creating and submitting a topology looks
 * something like:
 * 
 * <pre>
 * TopologyBuilder builder = new TopologyBuilder();
 * 
 * builder.setSpout(&quot;1&quot;, new TestWordSpout(true), 5);
 * builder.setSpout(&quot;2&quot;, new TestWordSpout(true), 3);
 * builder.setBolt(&quot;3&quot;, new TestWordCounter(), 3)
 * 		.fieldsGrouping(&quot;1&quot;, new Fields(&quot;word&quot;))
 * 		.fieldsGrouping(&quot;2&quot;, new Fields(&quot;word&quot;));
 * builder.setBolt(&quot;4&quot;, new TestGlobalCount()).globalGrouping(&quot;1&quot;);
 * 
 * Map conf = new HashMap();
 * conf.put(Config.TOPOLOGY_WORKERS, 4);
 * 
 * StormSubmitter.submitTopology(&quot;mytopology&quot;, conf, builder.createTopology());
 * </pre>
 * 
 * Running the exact same topology in local mode (in process), and configuring
 * it to log all tuples emitted, looks like the following. Note that it lets the
 * topology run for 10 seconds before shutting down the local cluster.
 * 
 * <pre>
 * TopologyBuilder builder = new TopologyBuilder();
 * 
 * builder.setSpout(&quot;1&quot;, new TestWordSpout(true), 5);
 * builder.setSpout(&quot;2&quot;, new TestWordSpout(true), 3);
 * builder.setBolt(&quot;3&quot;, new TestWordCounter(), 3)
 * 		.fieldsGrouping(&quot;1&quot;, new Fields(&quot;word&quot;))
 * 		.fieldsGrouping(&quot;2&quot;, new Fields(&quot;word&quot;));
 * builder.setBolt(&quot;4&quot;, new TestGlobalCount()).globalGrouping(&quot;1&quot;);
 * 
 * Map conf = new HashMap();
 * conf.put(Config.TOPOLOGY_WORKERS, 4);
 * conf.put(Config.TOPOLOGY_DEBUG, true);
 * 
 * LocalCluster cluster = new LocalCluster();
 * cluster.submitTopology(&quot;mytopology&quot;, conf, builder.createTopology());
 * Utils.sleep(10000);
 * cluster.shutdown();
 * </pre>
 * 
 * <p>
 * The pattern for TopologyBuilder is to map component ids to components using
 * the setSpout and setBolt methods. Those methods return objects that are then
 * used to declare the inputs for that component.
 * </p>
 */
public class TopologyBuilder {
	private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
	private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
	private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();

	// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new
	// HashMap<String, Map<GlobalStreamId, Grouping>>();

	private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();

	public StormTopology createTopology() {
		Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
		Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
		for (String boltId : _bolts.keySet()) {
			IRichBolt bolt = _bolts.get(boltId);
			ComponentCommon common = getComponentCommon(boltId, bolt);
			boltSpecs.put(
					boltId,
					new Bolt(ComponentObject.serialized_java(Utils
							.serialize(bolt)), common));
		}
		for (String spoutId : _spouts.keySet()) {
			IRichSpout spout = _spouts.get(spoutId);
			ComponentCommon common = getComponentCommon(spoutId, spout);
			spoutSpecs.put(
					spoutId,
					new SpoutSpec(ComponentObject.serialized_java(Utils
							.serialize(spout)), common));

		}
		return new StormTopology(spoutSpecs, boltSpecs,
				new HashMap<String, StateSpoutSpec>());
	}

	/**
	 * Define a new bolt in this topology with parallelism of just one thread.
	 * 
	 * @param id
	 *            the id of this component. This id is referenced by other
	 *            components that want to consume this bolt's outputs.
	 * @param bolt
	 *            the bolt
	 * @return use the returned object to declare the inputs to this component
	 */
	public BoltDeclarer setBolt(String id, IRichBolt bolt) {
		return setBolt(id, bolt, null);
	}

	/**
	 * Define a new bolt in this topology with the specified amount of
	 * parallelism.
	 * 
	 * @param id
	 *            the id of this component. This id is referenced by other
	 *            components that want to consume this bolt's outputs.
	 * @param bolt
	 *            the bolt
	 * @param parallelism_hint
	 *            the number of tasks that should be assigned to execute this
	 *            bolt. Each task will run on a thread in a process somewhere
	 *            around the cluster.
	 * @return use the returned object to declare the inputs to this component
	 */
	public BoltDeclarer setBolt(String id, IRichBolt bolt,
			Number parallelism_hint) {
		validateUnusedId(id);
		initCommon(id, bolt, parallelism_hint);
		_bolts.put(id, bolt);
		return new BoltGetter(id);
	}

	/**
	 * Define a new bolt in this topology. This defines a basic bolt, which is a
	 * simpler to use but more restricted kind of bolt. Basic bolts are intended
	 * for non-aggregation processing and automate the anchoring/acking process
	 * to achieve proper reliability in the topology.
	 * 
	 * @param id
	 *            the id of this component. This id is referenced by other
	 *            components that want to consume this bolt's outputs.
	 * @param bolt
	 *            the basic bolt
	 * @return use the returned object to declare the inputs to this component
	 */
	public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
		return setBolt(id, bolt, null);
	}

	/**
	 * Define a new bolt in this topology. This defines a basic bolt, which is a
	 * simpler to use but more restricted kind of bolt. Basic bolts are intended
	 * for non-aggregation processing and automate the anchoring/acking process
	 * to achieve proper reliability in the topology.
	 * 
	 * @param id
	 *            the id of this component. This id is referenced by other
	 *            components that want to consume this bolt's outputs.
	 * @param bolt
	 *            the basic bolt
	 * @param parallelism_hint
	 *            the number of tasks that should be assigned to execute this
	 *            bolt. Each task will run on a thread in a process somwehere
	 *            around the cluster.
	 * @return use the returned object to declare the inputs to this component
	 */
	public BoltDeclarer setBolt(String id, IBasicBolt bolt,
			Number parallelism_hint) {
		return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
	}

	/**
	 * Define a new spout in this topology.
	 * 
	 * @param id
	 *            the id of this component. This id is referenced by other
	 *            components that want to consume this spout's outputs.
	 * @param spout
	 *            the spout
	 */
	public SpoutDeclarer setSpout(String id, IRichSpout spout) {
		return setSpout(id, spout, null);
	}

	/**
	 * Define a new spout in this topology with the specified parallelism. If
	 * the spout declares itself as non-distributed, the parallelism_hint will
	 * be ignored and only one task will be allocated to this component.
	 * 
	 * @param id
	 *            the id of this component. This id is referenced by other
	 *            components that want to consume this spout's outputs.
	 * @param parallelism_hint
	 *            the number of tasks that should be assigned to execute this
	 *            spout. Each task will run on a thread in a process somwehere
	 *            around the cluster.
	 * @param spout
	 *            the spout
	 */
	public SpoutDeclarer setSpout(String id, IRichSpout spout,
			Number parallelism_hint) {
		validateUnusedId(id);
		initCommon(id, spout, parallelism_hint);
		_spouts.put(id, spout);
		return new SpoutGetter(id);
	}

	public void setStateSpout(String id, IRichStateSpout stateSpout) {
		setStateSpout(id, stateSpout, null);
	}

	public void setStateSpout(String id, IRichStateSpout stateSpout,
			Number parallelism_hint) {
		validateUnusedId(id);
		// TODO: finish
	}

	private void validateUnusedId(String id) {
		if (_bolts.containsKey(id)) {
			throw new IllegalArgumentException(
					"Bolt has already been declared for id " + id);
		}
		if (_spouts.containsKey(id)) {
			throw new IllegalArgumentException(
					"Spout has already been declared for id " + id);
		}
		if (_stateSpouts.containsKey(id)) {
			throw new IllegalArgumentException(
					"State spout has already been declared for id " + id);
		}
	}

	private ComponentCommon getComponentCommon(String id, IComponent component) {
		ComponentCommon ret = new ComponentCommon(_commons.get(id));

		OutputFieldsGetter getter = new OutputFieldsGetter();
		component.declareOutputFields(getter);
		ret.set_streams(getter.getFieldsDeclaration());
		return ret;
	}

	private void initCommon(String id, IComponent component, Number parallelism) {
		ComponentCommon common = new ComponentCommon();
		common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
		if (parallelism != null)
			common.set_parallelism_hint(parallelism.intValue());
		else {
		    common.set_parallelism_hint(Integer.valueOf(1));
		}
		Map conf = component.getComponentConfiguration();
		if (conf != null)
			common.set_json_conf(Utils.to_json(conf));
		_commons.put(id, common);
	}

	protected class ConfigGetter<T extends ComponentConfigurationDeclarer>
			extends BaseConfigurationDeclarer<T> {
		String _id;

		public ConfigGetter(String id) {
			_id = id;
		}

		@Override
		public T addConfigurations(Map conf) {
			if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
				throw new IllegalArgumentException(
						"Cannot set serializations for a component using fluent API");
			}
			String currConf = _commons.get(_id).get_json_conf();
			_commons.get(_id).set_json_conf(
					mergeIntoJson(parseJson(currConf), conf));
			return (T) this;
		}
	}

	protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements
			SpoutDeclarer {
		public SpoutGetter(String id) {
			super(id);
		}
	}

	protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements
			BoltDeclarer {
		private String _boltId;

		public BoltGetter(String boltId) {
			super(boltId);
			_boltId = boltId;
		}

		public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
			return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
		}

		public BoltDeclarer fieldsGrouping(String componentId, String streamId,
				Fields fields) {
			return grouping(componentId, streamId,
					Grouping.fields(fields.toList()));
		}

		public BoltDeclarer globalGrouping(String componentId) {
			return globalGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		public BoltDeclarer globalGrouping(String componentId, String streamId) {
			return grouping(componentId, streamId,
					Grouping.fields(new ArrayList<String>()));
		}

		public BoltDeclarer shuffleGrouping(String componentId) {
			return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
			return grouping(componentId, streamId,
					Grouping.shuffle(new NullStruct()));
		}

		public BoltDeclarer localOrShuffleGrouping(String componentId) {
			return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		public BoltDeclarer localOrShuffleGrouping(String componentId,
				String streamId) {
			return grouping(componentId, streamId,
					Grouping.local_or_shuffle(new NullStruct()));
		}
		
		@Override
		public BoltDeclarer localFirstGrouping(String componentId) {
			return localFirstGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		@Override
		public BoltDeclarer localFirstGrouping(String componentId,
				String streamId) {
			return grouping(componentId, streamId,
					Grouping.localFirst(new NullStruct()));
		}

		public BoltDeclarer noneGrouping(String componentId) {
			return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		public BoltDeclarer noneGrouping(String componentId, String streamId) {
			return grouping(componentId, streamId,
					Grouping.none(new NullStruct()));
		}

		public BoltDeclarer allGrouping(String componentId) {
			return allGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		public BoltDeclarer allGrouping(String componentId, String streamId) {
			return grouping(componentId, streamId,
					Grouping.all(new NullStruct()));
		}

		public BoltDeclarer directGrouping(String componentId) {
			return directGrouping(componentId, Utils.DEFAULT_STREAM_ID);
		}

		public BoltDeclarer directGrouping(String componentId, String streamId) {
			return grouping(componentId, streamId,
					Grouping.direct(new NullStruct()));
		}

		private BoltDeclarer grouping(String componentId, String streamId,
				Grouping grouping) {
			_commons.get(_boltId).put_to_inputs(
					new GlobalStreamId(componentId, streamId), grouping);
			return this;
		}

		@Override
		public BoltDeclarer customGrouping(String componentId,
				CustomStreamGrouping grouping) {
			return customGrouping(componentId, Utils.DEFAULT_STREAM_ID,
					grouping);
		}

		@Override
		public BoltDeclarer customGrouping(String componentId, String streamId,
				CustomStreamGrouping grouping) {
			return grouping(componentId, streamId,
					Grouping.custom_serialized(Utils.serialize(grouping)));
		}

		@Override
		public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) {
			return grouping(id.get_componentId(), id.get_streamId(), grouping);
		}

		
	}

	private static Map parseJson(String json) {
		if (json == null)
			return new HashMap();
		else
			return (Map) Utils.from_json(json);
	}

	private static String mergeIntoJson(Map into, Map newMap) {
		Map res = new HashMap(into);
		if (newMap != null)
			res.putAll(newMap);
		return Utils.to_json(res);
	}
}
